Cloud Data FusionのチュートリアルでBigQueryにデータを作ってみる
はじめに
データアナリティクス事業本部のkobayashiです。
クラスメソッド BigQuery Advent Calendar 2020 の8日目のエントリになります。
データパイプラインGCPのフルマネージドサービスであるCloud Data Fusionは気になっていたサービスなのですが、なかな時間を取って調査することができずにいました。今回その機会ができたので公式ドキュメントに記載されているチュートリアルを実践して概要を掴みます。
Cloud Data Fusionとは
Cloud Data Fusionはフルマネージドサービスで迅速にデータパイプラインを構築・管理できるデータ統合サービスです。ノンコーディングでETL、ELTデータパイプラインを構築・管理できるので、データ アナリストやビジネスユーザーもデータを管理できるようになります。
特徴として主なところを挙げると以下になります。
- CDAP というOSSベースのマネージドサービス
- GUI上の直感的な操作で作成
- GCP上のデータ、GCP以外の様々なデータソースに対応
- バッチとストリーミング処理が可能
- DataprocやGCP以外のHadoopの環境でも実行可能
- データリネージュ、統合メタデータなどのデータ管理機能
Cloud Data Fusionの主な機能
パイプライン
GUIで、ノード(コンポーネント)を接続してデータパイプライン(DAG)を作成できます。
ノードの種類としては、
- Sources
- Transforms
- Analytics
- Actions
- Sinks
- Error Handling
があり、これらを組み合わせてデータパイプラインを作成します。
Wrangler UI
実際のデータを確認しながらインタラクティブに変換を作成できます。Wrangler UIで作成した変換はTransformとしてパイプラインに組み込めます。
メタデータ
データセットのメタデータを管理し、簡単にデータセットの検索を行えます。
リネージュ
データセットレベルと項目レベルでデータどのパイプラインで使われているかを可視化してトラッキングが行えます。
HUB
再利用可能なコンポーネント(各種データベースのドライバ、AWS、Azureなどのクラウドサービスへのコネクタ)やサンプルパイプラインなどが利用できます。
まだまだ説明し足りないですが、ここでチュートリアルを実践してみます。
パイプラインの作成
実践するチュートリアルは ターゲティング キャンペーン パイプライン | Google Cloud を行ってみます。
これは、CSVファイルとBigQueryにあるデータを統合してデータを作成し、そのデータをBigQueryへ新たなテーブルとして書き込むパイプラインになります。
最終的に作成するパイプラインは以下になります。
事前準備
Cloud Data Fusionでパイプラインを作成して実行するためには、Cloud Data FusionとCloud DataprocのAPIが有効にする必要があります。また今回はCloud Strage、BigQueryも使うのでそれらのAPIも有効にする必要があります。
次にCloud Data Fusionインスタンスを作成する必要があります。こちらは公式ドキュメンで詳しく説明されていますのでそちらを実施します。 Cloud Data Fusion インスタンスの作成 | Google Cloud
権限設定
当初、上記の事前準備だけでデータパイプラインを実行できると考えパイプラインの構築を進めたのですが、パイプライン作成して実行するとすぐにエラーになってしまい、詳細ログを確認すると下記のエラーが発生していました。
PROVISION task failed in REQUESTING_CREATE state for program run program_run:default.CampaignPipeline_v1.-SNAPSHOT.workflow.DataPipelineWorkflow.xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx due to Dataproc operation failure: INVALID_ARGUMENT: User not authorized to act as service account 'xxxxxxxxxxxx-compute@developer.gserviceaccount.com'. To act as a service account, user must have one of [Owner, Editor, Service Account Actor] roles. See https://cloud.google.com/iam/docs/understanding-service-accounts for additional details..
パイプラインの構築途中でPreviewで実行確認ができるのですが、この時点ではエラーがでていなかったため気づくのに時間がかかってしまいました。これは権限設定が足りないため発生するエラーできちんとドキュメントを読めば記載があったのですが、当初読み飛ばしていました。
Cloud Data Fusion サービス アカウント | Cloud Data Fusion のドキュメント
従って、事前準備に加え権限設定を予め行います。設定方法は サービス アカウントの権限借用の管理 | Google Cloud に沿って進めます。
手順1)IAMと管理
コンソールに移動し、IAM
を選択する。
手順2)画面右のGoogle提供のロール付与を含みます
をチェックし、service-{プロジェクト番号}@gcp-sa- datafusion.iam.gserviceaccount.com
のメンバーを選択する。
手順3)ロールの編集画面でサービスアカウントユーザー(roles/iam.serviceAccountUser)のロールを付与する。
service-{プロジェクト番号}@gcp-sa- datafusion.iam.gserviceaccount.com
のメンバーのロールにサービスアカウントユーザーが追加されていればパイプラインの実行が行えます。
ここまでで準備が整ったのでパイプラインを作成していきます。
Wrangler UIによるデータ変換
手順1)初めに上記で作成したCloud Data Fusionインスタンスでインスタンスを表示
を押下し、Cloud Data Fusionのホーム画面へ進む。
手順2)Wrangle
を選択してWrangler
へ進む。
手順3)左のメニューからGoogle Cloud Storage > Sample Buckets
を選択し、campaign-tutorial
配下のcustomer.csv
を選択する。
手順4)ファイルの中身がbody
列に表示されるので、body
列のプルダウンをクリックしParse > CSV
を選択し、Comma
で分割する。
手順5)左端のbody
列は必要ないので、body
列のプルダウンをクリックしDelete column
を選択し、削除する。
手順6)カラム名をわかりやすいものに変えるため、右ペインの全選択を押下し全カラムを選択状態にしたあと、Column names > Set all
を選択するとモーダルが表示されるのでName,StreetAddress,City,State,Country
を入力しApply
を押下する。
ここでWranglerの目玉機能の一つであるInsightsでデータの中身を確認してみます。画面中央のInsights
を選択すると下図の様な画面が開き、1000個分のサブセットデータが表示されます。これにより変換やフィルタ条件を見積もることができ大変便利です。
手順7)City
列の値を使ってフィルタリングを行うため、City
列のプルダウンをクリックしFilter > Keep rows > value matches regex
を選択し、テキストボックスに^(California|Oregon|Washington)$
を入力しApply
を押下する。
*これでCity
列がCalifornia、Oregon、Washingtonだけのデータになります
手順8)StyreetAddress
列の値を使ってフィルタリングを行うため、StyreetAddress
列のプルダウンをクリックしFilter > Keep rows > value contains
を選択し、テキストボックスにAvenue
を入力しApply
を押下する。
*StyreetAddress
列にAvenue
が含まれているデータになります。
これでWranglerを使ってのデータ変換は完了です。これをパイプラインのソースとして組み込むので右上の+
を押下します。
Pipeline Studioによるパイプライン作成
下図がWranglerから遷移したPipeline Studioの画面になります。これにノードを付け足して行きパイプラインを完成させます。
手順1)GCSFile
ノードの上にマウスカーソルを合わせるとProperties
が表示されるので押下し、Wranglerでの設定を確認する。
DirectivesのRecipeにWranglerで行った操作が表示されます。またWrangle
ボタンを押下するとWrangler経戻り再度変換設定を行えます。
次にCSVファイルのデータと結合するBigQueryのstate_abbreviations
データをソースとして設定しますが、その前にBigQueryでデータの中身を確認します。
BigQueryに移動しクエリエディタで次のクエリを実行します。
SELECT * FROM `dis-user-guide.campaign_tutorial.state_abbreviations`
すると、上図のようなデータが表示されます。これをCSVのデータと結合します。
手順2)左のメニューのSource > BigQuery
を選択し、BigQuery
ノードを作成しProperties
を押下しBigQueryをデータソスとする設定を行う。
- Reference Name :
state_abbreviations
を入力 - Dataset Project ID Name :
dis-user-guide
を入力 - Dataset :
campaign_tutorial
を入力 - Table :
state_abbreviations
を入力
手順3)左のメニューのAnalytics > Joiner
を選択し、Wrangler
ノードとBigQuery
ノードを接続する。
手順4)Joiner
ノードのProperties
を押下し、Joinerの設定を行う。
*行っていることとしてはWranglerから出力されたレコードとBigQueryから出力されたレコードをStateとnameで外部結合しています。
- Fields
- Wrangler
- Name : チェックする
- StreetAddress : チェックする
- City : チェックする
- State : チェックしない
- BigQuery
- name : チェックしない
- abbreviation : チェックする
- Wrangler
- Join Type :
Outer
を選択- Requiree Inputs :
Wrangler
をチェック
- Requiree Inputs :
- Join Condition
- Wrangler :
State
- BigQuery :
name
- Wrangler :
手順5)左のメニューのSink > BigQuery
を選択し、Joiner
ノードを接続する。
手順6)BigQuery
ノードのProperties
を押下し、SinkのBigQuery設定を行う。
- Reference Name :
customer_data_abbreviated_states
を入力 - Dataset :
dis_user_guide
を入力 - Table :
customer_data_abbreviated_states
を入力
パイプラインの確認
Cloud Data FusionにはPreview機能があり、作成しているパイプラインをテストして実行に問題がなく想定通りかを確認することができます。ここまで作成したパイプラインでもこの機能を使って実行結果を確認してみます。
手順1)Pipeline Studioの画面上部のPreview
を押下する。
手順2)Preview
画面のRun
を押下してPreviewを実行する。
実行中は下図のようになり実行時間が表示されます。
Previewが完了すると下図のような画面になり、ログと各ノードでのデータが確認できます。ここでパイプラインが想定通りなのかを確認したり、エラー終了した際にはログを確認してパイプラインの修正を行えます。
Previewで問題がなければPipeline Studioの画面上部のDeploy
を押下してパイプラインをデプロイします。
パイプラインの実行
デプロイが終わるとパイプラインを実行することができるようになリます。
パイプラインの実行に問題がなければStatusがProvisioning、Running、Successと遷移します。
Successが表示されデータパイプラインの実行が終わると下図のような画面になります。各ノードで処理されたレコード数とエラー数が表示され、これをクリックすると詳細な情報を見ることができます。
実行結果をBigQueryで確認すると変換されたデータが登録されていることがわかります。
パイプラインのエクスポート
作成したパイプラインはJson形式でエクスポートが行えます。これを利用すれば他のプロジェクトで同じパイプラインをインポートしたり、Gitなどでバージョン管理をすることができます。
今回作成したパイプラインを出力してみると以下のような形になります。
{ "name": "CampaignPipeline", "description": "Data Pipeline Application", "artifact": { "name": "cdap-data-pipeline", "version": "6.2.3", "scope": "SYSTEM" }, "config": { "resources": { "memoryMB": 2048, "virtualCores": 1 }, "driverResources": { "memoryMB": 2048, "virtualCores": 1 }, "connections": [ { "from": "GCSFile", "to": "Wrangler" }, { "from": "Wrangler", "to": "Joiner" }, { "from": "Joiner", "to": "BigQuery1" }, { "from": "BigQuery", "to": "Joiner" } ], "comments": [], "postActions": [], "properties": {}, "processTimingEnabled": true, "stageLoggingEnabled": true, "stages": [ { "name": "GCSFile", "plugin": { "name": "GCSFile", "type": "batchsource", "label": "GCSFile", "artifact": { "name": "google-cloud", "version": "0.15.3", "scope": "SYSTEM" }, "properties": { "filenameOnly": "false", "copyHeader": "false", "schema": "{\"type\":\"record\",\"name\":\"text\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}", "path": "gs://campaign-tutorial/customers.csv", "format": "text", "project": "xxxxxxxxx", "recursive": "false", "referenceName": "campaign-tutorial.customers.csv" } }, "outputSchema": "{\"type\":\"record\",\"name\":\"text\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}", "id": "GCSFile", "type": "batchsource", "label": "GCSFile", "icon": "fa-plug" }, .... "schedule": "0 * * * *", "engine": "spark", "numOfRecordsPreview": 100, "description": "Data Pipeline Application", "maxConcurrentRuns": 1 } }
メタデータとリネージュ
メータデータとリネージュも確認してみます。
手順1)Cloud Data Fusionのホーム画面よりMetadata
を選択する。
手順2)テキストボックスに検索したい項目名を入力し検索ボタンを押下する。
*ここでの検索はスキーマ名、フィールド名、タグ等様々なメタデータで検索が行なえます。
検索結果が表示されるので、確認したいデータセットを選択し詳細を確認します。
またLineage
を選択するとそのデータセットがどのパイプラインでいつ使われ、どこへデータが流れていったかを確認することができます。
まとめ
Cloud Data Fusionを使ってデータパイプラインを構築し実行してみました。Cloud Data Fusionはノンコーディングで視覚的にデータパイプラインを作成できるので操作さえなれてしまえば複雑な処理も簡単に組むことができます。またメタデータとリネージュも確認しました。
今回はGCP内のデータソスでしたが、次回はHUB機能を使い他のクラウドサービスをデータソースとするパイプラインを作成してみたいと思います。
最後まで読んで頂いてありがとうございました。
クラスメソッド BigQuery Advent Calendar 2020 9日目は、しんや さんです。お楽しみに。